/*
 * MIT License
 *
 * Copyright (c) 2020 wen.gu <454727014@qq.com>
 *
 * Permission is hereby granted, free of charge, to any person obtaining a copy
 * of this software and associated documentation files (the "Software"), to deal
 * in the Software without restriction, including without limitation the rights
 * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
 * copies of the Software, and to permit persons to whom the Software is
 * furnished to do so, subject to the following conditions:
 *
 * The above copyright notice and this permission notice shall be included in all
 * copies or substantial portions of the Software.
 *
 * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
 * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
 * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
 * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
 * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
 * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
 * SOFTWARE.
 */

 /***************************************************************************
 * Name: ngl_server_general.c
 *
 * Purpose: implementation ngl server plugin for default
 *
 * Developer:
 *   wen.gu , 2023-04-24
 *
 * TODO:
 *
 ***************************************************************************/
#include <stdlib.h>
#include <pthread.h>
#include <sys/select.h>
#include <sys/socket.h>
#include <errno.h>

#include "ngl/server/ngl_server_general.h"
#include "ngl/ngl_message_parser.h"

#define SERVER_SELECT_TIMEOUT_MS 40

#define SERVER_RECV_BUF_SIZE (1024 * 10)
/***************************************************************************
 * Definitions of server_general class
 ***************************************************************************/

typedef struct _general_server_s general_server_t;

typedef struct _ngl_connection_s {
    struct _ngl_connection_s* next;
    ngl_bool_t is_running;
    ngl_transport_t* transport;
    pthread_t on_receiver_worker;
    general_server_t* server_ref;
    ngl_message_parser_t parser;
} ngl_connection_t;

struct _general_server_s {
    ngl_server_t supper;
    pthread_t on_connect_worker;
    ngl_bool_t is_running;
    ngl_connection_t* connections;
} ;

/***************************************************************************
 * inner function  implementation
 ***************************************************************************/

static void server_connection_destroy(ngl_connection_t* connection) {
    if (connection->on_receiver_worker) {
        connection->is_running = NGL_FALSE; 
        connection->on_receiver_worker = NULL; 
    }

#if 0
    ngl_transport_free(connection->transport);
    ngl_msg_parser_deinitialize(&connection->parser);

    free(connection);
#endif
}

static ngl_bool_t server_connection_remove(general_server_t* server, int32_t fd) {
    ngl_connection_t *temp = server->connections;
    ngl_connection_t *prev = temp;

    while (temp != NULL) {
        if ((temp->transport != NULL) && (temp->transport->fd == fd)) {
            prev->next = temp->next;
            server_connection_destroy(temp);/* Now we can destroy our pointer */
            return NGL_TRUE;
        }

        prev = temp;    
        temp = temp->next;
    }

    return NGL_FALSE;
}

static inline void* server_connection_receiver_worker(void* opaque) {
    ngl_connection_t* connection = (ngl_connection_t*)opaque;
    ngl_transport_t* transport = connection->transport;
    fd_set read_set;
    int32_t socket_id = transport->fd;

    struct timeval select_timeout = {0};  //timeout time   
    select_timeout.tv_sec = 0;   /** */
    select_timeout.tv_usec = SERVER_SELECT_TIMEOUT_MS * 1000;
    FD_ZERO(&read_set);    
    FD_SET(socket_id, &read_set);

    uint8_t* recv_buf = (uint8_t*)malloc(SERVER_RECV_BUF_SIZE);

    if (!recv_buf) {
        return NULL;
    }

    while (connection->is_running == NGL_TRUE) {
        int32_t ret = select(socket_id + 1, &read_set, NULL, NULL, &select_timeout);

        if ((ret > 0) && FD_ISSET(socket_id, &read_set)) {
            int32_t recev_size = ngl_transport_read(transport, recv_buf, SERVER_RECV_BUF_SIZE);

            if (recev_size > 0) {
                ngl_msg_parser_fill_data(&connection->parser, recv_buf, recev_size);
            } else if (recev_size == 0) {
                server_connection_remove(connection->server_ref, socket_id);
                break;
            } else {
                if (errno != EINTR) {
                    server_connection_remove(connection->server_ref, socket_id);
                    break;
                } else {
                    printf("server receive data error: %s\n", strerror(errno));
                }
            }
        }
    }

    ngl_transport_free(connection->transport);
    ngl_msg_parser_deinitialize(&connection->parser);
    free(connection);

    if (recv_buf) {
        free(recv_buf);
    }
    
    return NULL;
}

static void server_connection_on_receive(void* connection_opaque, ngl_message_t* msg) {
    ngl_connection_t* connection = (ngl_connection_t*)connection_opaque;
    ngl_server_t* server = &connection->server_ref->supper;
    if (server->on_receive) {
        server->on_receive(server->user_opaque, connection_opaque, msg);
    } else {
        ngl_message_destroy(msg);
    }
}

static ngl_bool_t server_connection_add(general_server_t* server, ngl_transport_t* transport) {
    ngl_connection_t* connection = (ngl_connection_t*)malloc(sizeof(ngl_connection_t));
    if (connection) {
        memset(connection, 0, sizeof(ngl_connection_t));
        connection->transport = transport;
        connection->server_ref = server;
        connection->is_running = NGL_TRUE;
        connection->next = NULL;
        ngl_msg_parser_initialize(&connection->parser, server_connection_on_receive, connection);
        ngl_connection_t **temp = &server->connections;
        pthread_attr_t type;

        /* Set the thread attributes */
        if (pthread_attr_init(&type) != 0) {
            printf("Couldn't initialize pthread attributes");
            free(connection);
            return NGL_FALSE;
        }

        pthread_attr_setdetachstate(&type, PTHREAD_CREATE_DETACHED);
        int32_t ret = pthread_create(&connection->on_receiver_worker, &type, server_connection_receiver_worker, (void*)connection);

        if (ret != 0) {
            printf("create pthread thread failed(%s)\n", strerror(errno));
            free(connection);
            return NGL_FALSE;            
        }

        while (*temp != NULL)
            temp = &(*temp)->next;

        *temp = connection;

        return NGL_TRUE;
    }

    return NGL_FALSE;
}

static ngl_connection_t* server_connection_find(general_server_t* server, int32_t fd) {
    ngl_connection_t *temp = server->connections;

    while (temp != NULL) {
        if ((temp->transport != NULL) && (temp->transport->fd == fd))
            return temp;
        temp = temp->next;
    }

    return temp;
}

static ngl_error_t server_connection_send(ngl_connection_t* connection, const uint8_t* msg_buf, int32_t msg_size) {
    int32_t send_size = 0;
    do
    {
        int32_t ret = ngl_transport_write(connection->transport, msg_buf, (msg_size - send_size));
        if (ret < 0) {
            return NGL_ErrUndefined;
        }
        send_size += ret;
    } while (send_size < msg_size);

    return NGL_ErrOK;
}

///////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
///////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
static inline void* general_server_process_client_connection(void* opaque) {
    general_server_t* server = (general_server_t*)opaque;
    ngl_server_t* supper = &server->supper;
    while (server->is_running == NGL_TRUE) {
        ngl_transport_t* transport = ngl_transport_process_client_connection(supper->transport, SERVER_SELECT_TIMEOUT_MS);
        if (transport) {
            if (server_connection_add(server, transport) == NGL_FALSE) {
                ngl_transport_free(transport);
            }
        }
    }

    return NULL;
}

static ngl_error_t general_server_start(ngl_server_t* self) {
    general_server_t* server = (general_server_t*)self;
    if (server->on_connect_worker) {
        return NGL_ErrInvalidStatus;
    }

    pthread_attr_t type;

    /* Set the thread attributes */
    if (pthread_attr_init(&type) != 0) {
        printf("Couldn't initialize pthread attributes");
        return NGL_ErrInsufficientResources;
    }

    pthread_attr_setdetachstate(&type, PTHREAD_CREATE_JOINABLE);
    server->is_running = NGL_TRUE;

    int32_t ret = pthread_create(&server->on_connect_worker, &type, general_server_process_client_connection, (void*)server);

    if (ret != 0) {
        printf("create pthread thread failed(%s)\n", strerror(errno));
        return NGL_ErrInsufficientResources;            
    }
    
    return NGL_ErrOK;
}

static ngl_error_t general_server_stop(ngl_server_t* self) {
    general_server_t* server = (general_server_t*)self;
    if (!server->on_connect_worker) {
        return NGL_ErrInvalidStatus;
    }

    server->is_running = NGL_FALSE;
    pthread_join(server->on_connect_worker, NULL);
}

/** if  opaque is NGL_SERVER_MSG_BROADCAST_OPAQUE, then indicate broad cast current message */
static ngl_error_t general_server_send(ngl_server_t* self, void* opaque, ngl_message_t* msg) {
    if (!msg) {
        return NGL_ErrInvalidArgument;
    }
    
    int32_t msg_size = 0;
    const uint8_t* msg_buf = ngl_message_serialized_buffer(msg, (uint16_t*)&msg_size);

    if (!msg_buf || (msg_size == 0)) {
        return NGL_ErrInsufficientResources;
    }

    if (opaque) {
        ngl_connection_t* connection = (ngl_connection_t*)opaque;        
        return server_connection_send(connection, msg_buf, msg_size);
    } else {
        if (!self) {
            return NGL_ErrInvalidArgument;
        }

        general_server_t* server = (general_server_t*)self;
        ngl_connection_t* connection = server->connections;
        while (connection) { //broadcast
            if (connection) {
                ngl_error_t ret = server_connection_send(connection, msg_buf, msg_size);
                if (ret != NGL_ErrOK) {
                    //todo refine me??
                }
                connection = connection->next;
            }
        }
    }

    return NGL_ErrOK;
}

static void general_server_free_fn(ngl_server_t* self) {
    general_server_t* server = (general_server_t*)self;
    general_server_stop(self);
    ngl_connection_t* connection = server->connections;
    while (connection) {
        ngl_connection_t* next = connection->next;
        server_connection_destroy(connection);
        connection = next;
    }

    ngl_server_free_method(self);
}


/***************************************************************************
 * API function  implementation
 ***************************************************************************/
ngl_server_t* ngl_server_general_new(ngl_transport_t* transport, ngl_server_on_receive on_receive, void* user_opaque) {
    general_server_t* server = (general_server_t*)malloc(sizeof(general_server_t));
    if (!server) {
        return  NULL;
    }
    memset(server, 0, sizeof(general_server_t));
    ngl_server_t* supper = &server->supper;
    ngl_server_init_instance(supper, transport, on_receive, user_opaque);
    supper->start = general_server_start;
    supper->stop = general_server_stop;
    supper->send = general_server_send;
    supper->free_fn = general_server_free_fn;
    
    return server;
}   